Skip to content

feat: implement asynchronous audit logging with RabbitMQ and DLQ#100

Merged
Junman140 merged 2 commits intoPi-Defi-world:mainfrom
N-thnI:feature/rabbitmq-audit-system
Mar 26, 2026
Merged

feat: implement asynchronous audit logging with RabbitMQ and DLQ#100
Junman140 merged 2 commits intoPi-Defi-world:mainfrom
N-thnI:feature/rabbitmq-audit-system

Conversation

@N-thnI
Copy link
Copy Markdown
Contributor

@N-thnI N-thnI commented Mar 26, 2026

closes #56

Summary by CodeRabbit

  • New Features

    • Audit logging now uses a queued architecture with a dedicated audit consumer and dead-letter handling
    • Added automatic retry with exponential backoff for failed audit entries
    • Implemented fallback file storage for audit entries when queue publish fails
    • Added configurable alert email to notify on critical audit logging failures
  • Tests

    • Added unit tests covering audit producer, consumer, retry and fallback behaviors

@drips-wave
Copy link
Copy Markdown

drips-wave bot commented Mar 26, 2026

@N-thnI Great news! 🎉 Based on an automated assessment of this PR, the linked Wave issue(s) no longer count against your application limits.

You can now already apply to more issues while waiting for a review of this PR. Keep up the great work! 🚀

Learn more about application limits

@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Mar 26, 2026

Caution

Review failed

The pull request is closed.

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 81737b7e-1b0b-413a-8f1c-03eec81831ea

📥 Commits

Reviewing files that changed from the base of the PR and between 98da1d3 and 939fe3e.

📒 Files selected for processing (3)
  • src/jobs/auditConsumer.test.ts
  • src/jobs/auditConsumer.ts
  • src/services/audit/auditService.test.ts

📝 Walkthrough

Walkthrough

Introduces RabbitMQ-backed audit pipeline: publishes audit events to an AUDIT_LOGS queue, adds a consumer with retry and DLQ handling, adds local fallback and alert email on publish failures, and wires consumer startup into server initialization.

Changes

Cohort / File(s) Summary
Configuration
src/config/env.ts, src/config/rabbitmq.ts
Added notification.alertEmail config (`process.env.NOTIFICATION_ALERT_EMAIL
Server bootstrap
src/index.ts
Starts startAuditConsumer() during RabbitMQ-ready startup sequence.
Audit producer service
src/services/audit/auditService.ts, src/services/audit/auditService.test.ts
Replaced direct Prisma writes with publishing to AUDIT_LOGS; added fallback handleAuditFailure that appends lost-audits.log and conditionally sends alert email using config.notification.alertEmail; tests cover success, publish failure, and channel error paths.
Audit consumer
src/jobs/auditConsumer.ts, src/jobs/auditConsumer.test.ts
New consumer startAuditConsumer() that asserts queue with DLQ, consumes JSON messages, retries persistence with exponential backoff, acks on success, and nacks to DLQ after retries; tests cover happy path and retry-to-DLQ behavior.
Misc tests
tests/simple.test.ts
Added a trivial passing test placeholder.

Sequence Diagram(s)

sequenceDiagram
    participant App as Application
    participant AS as AuditService
    participant RMQ as RabbitMQ
    participant AC as AuditConsumer
    participant DB as Database
    participant DLQ as AUDIT_LOGS_DLQ
    participant Fallback as Fallback Handler

    rect rgba(76, 175, 80, 0.5)
    note over App,DB: Happy path
    App->>AS: logAudit(entry)
    AS->>RMQ: sendToQueue(AUDIT_LOGS, payload, persistent)
    RMQ-->>AS: true
    AS-->>App: return
    RMQ->>AC: deliver(message)
    AC->>DB: prisma.auditTrail.create(parsed)
    DB-->>AC: success
    AC->>RMQ: ack(message)
    end

    rect rgba(255, 152, 0, 0.5)
    note over App,Fallback: Publish failure -> fallback
    App->>AS: logAudit(entry)
    AS->>RMQ: sendToQueue(AUDIT_LOGS, payload, persistent)
    RMQ-->>AS: false
    AS->>Fallback: handleAuditFailure(entry + error)
    Fallback->>Fallback: append lost-audits.log
    Fallback->>Fallback: sendEmail(if configured)
    AS-->>App: return
    end

    rect rgba(244, 67, 54, 0.5)
    note over AC,DLQ: Consumer retries -> DLQ
    RMQ->>AC: deliver(message)
    AC->>AC: retry loop (exponential backoff)
    AC->>DB: prisma.auditTrail.create()
    DB-->>AC: error (after retries)
    AC->>RMQ: nack(message, false, false) -> routed to DLQ
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

Possibly related PRs

Poem

🐰 I hopped a log into the stream,

Through queues and retries, steady beam,
If brokers fail, I stash and tell,
A tiny email rings the bell,
No audit lost — the bunny's dream. 🥕📬

🚥 Pre-merge checks | ✅ 3 | ❌ 2

❌ Failed checks (1 warning, 1 inconclusive)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 75.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Out of Scope Changes check ❓ Inconclusive The simple test case in tests/simple.test.ts (asserting 1===1) appears unrelated to audit logging implementation and may represent incomplete test scaffolding. Clarify the purpose of the simple test case or remove it if it was added accidentally during development.
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly and specifically summarizes the main change: implementing asynchronous audit logging with RabbitMQ and DLQ, which is the primary objective of the changeset.
Linked Issues check ✅ Passed The PR fully addresses issue #56 requirements: retry logic with exponential backoff [auditConsumer.ts], fallback file persistence with alerting [auditService.ts], and graceful DLQ routing for failed messages.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/index.ts`:
- Around line 108-110: The call to startAuditConsumer inside startServer can
throw (e.g., from getRabbitMQChannel/queue assertions) and currently lives in
the outer try/catch, causing the whole service to exit on audit-queue failure;
change this so audit consumer startup is isolated: move the dynamic
import("./jobs/auditConsumer")/startAuditConsumer invocation out of the main
startServer critical path and wrap it in its own try/catch (or start it without
awaiting) so any exception from startAuditConsumer or getRabbitMQChannel is
caught, logged (including error details) and the server continues running
without queue-based features.

In `@src/jobs/auditConsumer.ts`:
- Around line 17-18: The handler currently calls JSON.parse on msg.content
outside the retry/DLQ try/catch so malformed payloads escape ack/nack handling;
move the parsing of msg.content (the lines that create content and entry) inside
the existing try/catch used for processing, and in the catch ensure you call
msg.nack() (or msg.ack() as appropriate for fatal vs retryable) so malformed
messages are nacked and follow the retry/DLQ path; update the same pattern for
the other block referenced (lines 39-55) so every JSON.parse is wrapped and
leads to msg.nack() on parse errors.

In `@src/services/audit/auditService.ts`:
- Around line 65-98: The current try block that writes the fallback file also
contains the sendEmail call, so if fs.appendFileSync or mkdirSync fails the
alert email is skipped; refactor the block in auditService.ts so that
sendEmail(config.notification.alertEmail, subject, body) is executed outside the
filesystem try (or placed into a finally) — keep building subject/body using
entry and error as shown, wrap the sendEmail call in its own catch to log
failures via logger.error, and ensure fallbackPath/logger messages remain in the
file-write try while email alerting is always attempted regardless of filesystem
errors.
- Around line 59-63: The logger.error call (and the alert email path) currently
embeds the full audit entry object; create and use a sanitizer (e.g.
redactSensitiveFields or sanitizeAuditEntry) that returns a copy of entry with
sensitive keys removed or replaced (e.g. strip or mask oldValue/newValue/any
payload fields) and pass that sanitized object to logger.error and to the
email-sending routine (the alert email body path). Update the code paths that
reference entry (the logger.error call and the alert email construction) to use
the sanitized version instead of the raw entry to avoid leaking sensitive audit
payloads.
- Around line 30-45: The current use of channel.sendToQueue(QUEUES.AUDIT_LOGS,
...) relies on the write-buffer boolean and must be replaced with publisher
confirms: create a ConfirmChannel via connection.createConfirmChannel() and
publish using ConfirmChannel.sendToQueue with the callback form sendToQueue(...,
{ persistent: true, mandatory: true }, (err, ok) => { ... }) so you only treat
the message as delivered on callback success (err === null); also attach a
channel 'return' event handler to detect unroutable messages and handle/alert
accordingly; update any logic in the audit publishing path that currently throws
on a boolean false to instead wait for the confirm callback and handle
err/return events.

In `@tests/simple.test.ts`:
- Around line 1-5: The current test only asserts a trivial value and doesn't
exercise the audit logging system; replace the trivial spec with
unit/integration tests that exercise the real code paths: call the
AuditPublisher.publishAuditEvent (or equivalent) to assert it enqueues to
RabbitMQClient.publish, simulate publish failures to verify the retry logic on
publishRetry, assert messages routed to DLQHandler.handleDeadLetter after
exceeded retries, verify FileFallback.persist (or fallbackToFile) is invoked
when RabbitMQ is unavailable, and mock sendCriticalAlert (or
AlertService.notifyCritical) to confirm alerting on persistent failures; use
spies/mocks for RabbitMQClient, DLQHandler, FileFallback, and AlertService to
assert behavior and add an integration-style test that runs the async flow
end-to-end with a test RabbitMQ or in-memory emulator.
- Around line 1-2: The test file uses single quotes which violate the project's
Prettier rules; update string delimiters in the test by replacing single quotes
with double quotes in the describe and it calls (e.g., the "describe" and "it"
string literals) so the file uses double-quoted strings consistently per the
project's Prettier configuration.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 7c84ffff-51ef-457e-9479-5ed01ec9c20f

📥 Commits

Reviewing files that changed from the base of the PR and between a47ddc5 and 98da1d3.

📒 Files selected for processing (8)
  • src/config/env.ts
  • src/config/rabbitmq.ts
  • src/index.ts
  • src/jobs/auditConsumer.test.ts
  • src/jobs/auditConsumer.ts
  • src/services/audit/auditService.test.ts
  • src/services/audit/auditService.ts
  • tests/simple.test.ts

Comment on lines +108 to +110
// Start audit consumer (AUDIT_LOGS → database)
const { startAuditConsumer } = await import("./jobs/auditConsumer");
await startAuditConsumer();
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Isolate audit-consumer startup failure from API startup.

This await sits inside the outer startServer() try/catch, so any getRabbitMQChannel()/queue-assertion failure from startAuditConsumer() now takes the whole service down at Lines 188-190. That contradicts the earlier “continue without queue-based features” behavior and turns an audit-queue misconfiguration into a full API outage.

Suggested fix
-      const { startAuditConsumer } = await import("./jobs/auditConsumer");
-      await startAuditConsumer();
+      try {
+        const { startAuditConsumer } = await import("./jobs/auditConsumer");
+        await startAuditConsumer();
+      } catch (error) {
+        logger.error(
+          "Audit consumer failed to start; continuing with fallback-only audit persistence.",
+          error,
+        );
+      }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Start audit consumer (AUDIT_LOGS → database)
const { startAuditConsumer } = await import("./jobs/auditConsumer");
await startAuditConsumer();
// Start audit consumer (AUDIT_LOGS → database)
try {
const { startAuditConsumer } = await import("./jobs/auditConsumer");
await startAuditConsumer();
} catch (error) {
logger.error(
"Audit consumer failed to start; continuing with fallback-only audit persistence.",
error,
);
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/index.ts` around lines 108 - 110, The call to startAuditConsumer inside
startServer can throw (e.g., from getRabbitMQChannel/queue assertions) and
currently lives in the outer try/catch, causing the whole service to exit on
audit-queue failure; change this so audit consumer startup is isolated: move the
dynamic import("./jobs/auditConsumer")/startAuditConsumer invocation out of the
main startServer critical path and wrap it in its own try/catch (or start it
without awaiting) so any exception from startAuditConsumer or getRabbitMQChannel
is caught, logged (including error details) and the server continues running
without queue-based features.

Comment on lines +17 to +18
const content = msg.content.toString();
const entry = JSON.parse(content);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Nack malformed messages instead of bypassing the DLQ path.

JSON.parse() happens before the retry/DLQ try/catch, so a bad payload exits the handler before either ack() or nack() runs. That leaves poison messages outside the intended DLQ flow.

Suggested fix
-    const content = msg.content.toString();
-    const entry = JSON.parse(content);
+    let entry: any;
+    try {
+      entry = JSON.parse(msg.content.toString());
+    } catch (error: any) {
+      logger.error("Invalid audit payload, moving message to DLQ", {
+        error: error.message || error,
+      });
+      channel.nack(msg, false, false);
+      return;
+    }

Also applies to: 39-55

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/jobs/auditConsumer.ts` around lines 17 - 18, The handler currently calls
JSON.parse on msg.content outside the retry/DLQ try/catch so malformed payloads
escape ack/nack handling; move the parsing of msg.content (the lines that create
content and entry) inside the existing try/catch used for processing, and in the
catch ensure you call msg.nack() (or msg.ack() as appropriate for fatal vs
retryable) so malformed messages are nacked and follow the retry/DLQ path;
update the same pattern for the other block referenced (lines 39-55) so every
JSON.parse is wrapped and leads to msg.nack() on parse errors.

Comment on lines +30 to +45
const success = channel.sendToQueue(
QUEUES.AUDIT_LOGS,
Buffer.from(JSON.stringify(payload)),
{ persistent: true },
);

if (success) {
logger.debug("Audit entry published to queue", {
eventType: entry.eventType,
entityType: entry.entityType ?? undefined,
entityId: entry.entityId ?? undefined,
action: entry.action,
oldValue: entry.oldValue ?? undefined,
newValue: entry.newValue ?? undefined,
performedBy: entry.performedBy ?? undefined,
},
});
return;
}

// If queue is full or other internal issue, treat as failure
throw new Error("RabbitMQ sendToQueue returned false");
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

❓ Verification inconclusive

Script executed:

cat -n src/services/audit/auditService.ts | sed -n '20,50p'

Repository: Pi-Defi-world/acbu-backend


Repository: Pi-Defi-world/acbu-backend
Exit code: 0

stdout:

    20	 * Falls back to local file and alerting if RabbitMQ is unavailable.
    21	 */
    22	export async function logAudit(entry: AuditEntry): Promise<void> {
    23	  try {
    24	    const channel = getRabbitMQChannel();
    25	    const payload = {
    26	      ...entry,
    27	      timestamp: new Date().toISOString(),
    28	    };
    29	
    30	    const success = channel.sendToQueue(
    31	      QUEUES.AUDIT_LOGS,
    32	      Buffer.from(JSON.stringify(payload)),
    33	      { persistent: true },
    34	    );
    35	
    36	    if (success) {
    37	      logger.debug("Audit entry published to queue", {
    38	        eventType: entry.eventType,
    39	        action: entry.action,
    40	      });
    41	      return;
    42	    }
    43	
    44	    // If queue is full or other internal issue, treat as failure
    45	    throw new Error("RabbitMQ sendToQueue returned false");
    46	  } catch (error: any) {
    47	    logger.warn("RabbitMQ audit publish failed, using fallback", {
    48	      error: error.message || error,
    49	      eventType: entry.eventType,
    50	    });

amqplib sendToQueue return value boolean backpressure vs delivery


In amqplib, the channel.sendToQueue(queue, content, [options], [callback]) method returns a boolean value indicating local flow control (backpressure), not delivery success to the broker. - true: The message was accepted into the channel's output buffer; you can continue sending immediately. - false: The channel's write buffer is full (backpressure). Wait for the channel's 'drain' event before sending more messages to avoid memory exhaustion. This boolean is unrelated to whether the message reaches the queue or broker. Even if false is returned, the message is buffered internally by amqplib and will be sent when possible (assuming no crash). Do not resend the message; just pause publishing until 'drain'. For actual delivery confirmation (broker acknowledgment that the message was processed/stored): - Use a ConfirmChannel (from connection.createConfirmChannel). - Pass a callback as the last argument: ch.sendToQueue(..., function(err, ok) { ... }). - err: Non-null if delivery failed (e.g., unroutable with mandatory: true). - ok: true if successfully delivered to broker. - Or use ch.waitForConfirms after sending (for all pending messages). - Listen for 'return' events on unroutable mandatory messages. sendToQueue bypasses exchanges (direct to queue), so no routing issues unless queue doesn't exist (use mandatory: true and 'return' handler to detect). Examples: // Regular channel - only backpressure boolean let ok = ch.sendToQueue('q', Buffer.from('msg')); if (!ok) { // Wait for 'drain' await new Promise(resolve => ch.once('drain', resolve)); } // ConfirmChannel - delivery callback ch.sendToQueue('q', Buffer.from('msg'), {}, (err, ok) => { if (err) console.error('Delivery failed'); else console.log('Delivered'); });

Citations:


amqplib ConfirmChannel publisher confirms vs Channel sendToQueue


In amqplib, Channel.sendToQueue and ConfirmChannel publisher confirms serve different purposes for reliable message publishing to RabbitMQ. Channel.sendToQueue (on a regular Channel): - Publishes directly to a specific queue (bypassing exchange routing). - Returns boolean: true (keep sending) or false (wait for 'drain' event due to buffer full). - No broker confirmation of receipt/persistence - fire-and-forget, message can be lost on network failure or broker issues. - Behaves like stream.Writable for flow control. ConfirmChannel (created via connection.createConfirmChannel): - Enables RabbitMQ publisher confirms (broker ACK/NACK after message handled). - sendToQueue and publish accept optional callback(err, ok): called on broker confirm (ACK=ok, NACK=err). - Also has waitForConfirms: waits for all outstanding publishes since last call (or channel open) to be confirmed. - Provides at-least-once delivery guarantee when used with persistent messages and durable queues. Key differences: - Regular Channel: Local flow control only, no broker ACK. - ConfirmChannel: Adds broker-level confirmation for data safety. Usage example (Node.js): // Regular channel - no confirms const channel = await conn.createChannel; channel.sendToQueue('queue', Buffer.from('msg')); // Confirm channel - with confirms const confirmChannel = await conn.createConfirmChannel; confirmChannel.sendToQueue('queue', Buffer.from('msg'), {}, (err, ok) => { if (err) console.error('NACK'); else console.log('ACK'); }); // Or wait for batch confirmChannel.sendToQueue('queue', Buffer.from('msg')); await confirmChannel.waitForConfirms; For production reliability, use ConfirmChannel with persistent: true option.

Citations:


Script executed:

# Check if ConfirmChannel is used anywhere in the codebase
rg "ConfirmChannel" --type typescript

Repository: Pi-Defi-world/acbu-backend


Repository: Pi-Defi-world/acbu-backend
Exit code: 2

stderr:

unrecognized file type: typescript

Script executed:

# Check RabbitMQ channel initialization and confirm settings
cat -n src/config/rabbitmq.ts

Repository: Pi-Defi-world/acbu-backend


Repository: Pi-Defi-world/acbu-backend
Exit code: 0

stdout:

     1	import amqp, { Channel, ChannelModel } from "amqplib";
     2	import { config } from "./env";
     3	import { logger } from "./logger";
     4	
     5	let connection: ChannelModel | null = null;
     6	let channel: Channel | null = null;
     7	
     8	export async function connectRabbitMQ(): Promise<Channel> {
     9	  if (channel) {
    10	    return channel;
    11	  }
    12	
    13	  try {
    14	    connection = await amqp.connect(config.rabbitmqUrl);
    15	    const ch = await connection.createChannel();
    16	    channel = ch;
    17	    logger.info("RabbitMQ connected successfully");
    18	
    19	    await ch.assertExchange(EXCHANGES.DLX, "direct", { durable: true });
    20	    logger.info("Dead letter exchange asserted");
    21	
    22	    // Handle connection errors
    23	    connection.on("error", (err: Error) => {
    24	      logger.error("RabbitMQ connection error", err);
    25	    });
    26	
    27	    connection.on("close", () => {
    28	      logger.warn("RabbitMQ connection closed");
    29	      connection = null;
    30	      channel = null;
    31	    });
    32	
    33	    return ch;
    34	  } catch (error) {
    35	    logger.error("Failed to connect to RabbitMQ", error);
    36	    throw error;
    37	  }
    38	}
    39	
    40	export interface QueueOptions extends amqp.Options.AssertQueue {
    41	  deadLetterQueue?: string;
    42	}
    43	
    44	export async function assertQueueWithDLQ(
    45	  queueName: string,
    46	  options: QueueOptions = {},
    47	): Promise<amqp.Replies.AssertQueue> {
    48	  const ch = getRabbitMQChannel();
    49	  const dlqName = `${queueName}_dlq`;
    50	
    51	  await ch.assertQueue(dlqName, { durable: true });
    52	
    53	  await ch.bindQueue(dlqName, EXCHANGES.DLX, queueName);
    54	
    55	  const queueOptions: amqp.Options.AssertQueue = {
    56	    ...options,
    57	    durable: true,
    58	    arguments: {
    59	      ...options.arguments,
    60	      deadLetterExchange: EXCHANGES.DLX,
    61	      deadLetterRoutingKey: queueName,
    62	    },
    63	  };
    64	
    65	  return ch.assertQueue(queueName, queueOptions);
    66	}
    67	
    68	export async function disconnectRabbitMQ(): Promise<void> {
    69	  if (channel) {
    70	    await channel.close();
    71	    channel = null;
    72	  }
    73	  if (connection) {
    74	    await connection.close();
    75	    connection = null;
    76	    logger.info("RabbitMQ disconnected");
    77	  }
    78	}
    79	
    80	export function getRabbitMQChannel(): Channel {
    81	  if (!channel) {
    82	    throw new Error("RabbitMQ not connected. Call connectRabbitMQ() first.");
    83	  }
    84	  return channel;
    85	}
    86	
    87	// Queue names
    88	export const QUEUES = {
    89	  USDC_CONVERSION: "usdc_conversion",
    90	  USDC_CONVERSION_DLQ: "usdc_conversion_dlq",
    91	  WITHDRAWAL_PROCESSING: "withdrawal_processing",
    92	  WITHDRAWAL_PROCESSING_DLQ: "withdrawal_processing_dlq",
    93	  REBALANCING: "rebalancing",
    94	  REBALANCING_DLQ: "rebalancing_dlq",
    95	  NOTIFICATIONS: "notifications",
    96	  NOTIFICATIONS_DLQ: "notifications_dlq",
    97	  OTP_SEND: "otp_send", // OTP delivery (email/SMS) via worker
    98	  OTP_SEND_DLQ: "otp_send_dlq",
    99	  WEBHOOKS: "webhooks",
   100	  WEBHOOKS_DLQ: "webhooks_dlq", // failed webhook deliveries for retry
   101	  KYC_PROCESSING: "kyc_processing",
   102	  KYC_PROCESSING_DLQ: "kyc_processing_dlq",
   103	  WALLET_ACTIVATION: "wallet_activation", // send XLM to user wallet when KYC fee paid
   104	  WALLET_ACTIVATION_DLQ: "wallet_activation_dlq",
   105	  ACBU_SAVINGS_VAULT_EVENTS: "acbu_savings_vault_events",
   106	  ACBU_SAVINGS_VAULT_EVENTS_DLQ: "acbu_savings_vault_events_dlq",
   107	  ACBU_LENDING_POOL_EVENTS: "acbu_lending_pool_events",
   108	  ACBU_LENDING_POOL_EVENTS_DLQ: "acbu_lending_pool_events_dlq",
   109	  ACBU_ESCROW_EVENTS: "acbu_escrow_events",
   110	  ACBU_ESCROW_EVENTS_DLQ: "acbu_escrow_events_dlq",
   111	  XLM_TO_ACBU: "xlm_to_acbu", // XLM deposit: sell XLM and mint ACBU to user
   112	  XLM_TO_ACBU_DLQ: "xlm_to_acbu_dlq",
   113	  USDC_CONVERT_AND_MINT: "usdc_convert_and_mint", // USDC deposit: convert USDC→XLM (backend), then mint
   114	  USDC_CONVERT_AND_MINT_DLQ: "usdc_convert_and_mint_dlq",
   115	  AUDIT_LOGS: "audit_logs",
   116	  AUDIT_LOGS_DLQ: "audit_logs_dlq",
   117	} as const;
   118	
   119	// Exchange names
   120	export const EXCHANGES = {
   121	  RESERVE_EVENTS: "reserve_events",
   122	  TRANSACTION_EVENTS: "transaction_events",
   123	  DLX: "dlx", // Dead letter exchange
   124	} as const;

Script executed:

# Check if mandatory or return options are used
rg "mandatory|return" src/services/audit/ --type typescript

Repository: Pi-Defi-world/acbu-backend


Repository: Pi-Defi-world/acbu-backend
Exit code: 2

stderr:

unrecognized file type: typescript

Script executed:

# Check if there are any uses of ConfirmChannel elsewhere in codebase
rg "ConfirmChannel|createConfirmChannel" --type ts

Repository: Pi-Defi-world/acbu-backend


Repository: Pi-Defi-world/acbu-backend
Exit code: 1


Script executed:

# Check if callback or waitForConfirms is used anywhere
rg "waitForConfirms|sendToQueue.*function|sendToQueue.*callback" --type ts

Repository: Pi-Defi-world/acbu-backend


Repository: Pi-Defi-world/acbu-backend
Exit code: 1


Use ConfirmChannel with publisher confirms for audit log delivery guarantees.

The sendToQueue() return value only reflects local write-buffer backpressure, not broker acknowledgement. true means "accepted into output buffer", false means "wait for drain"—neither confirms the broker received or persisted the message. This causes two problems:

  1. The code triggers fallback logging/alerts when merely experiencing backpressure (false positive), potentially duplicating audit records unnecessarily.
  2. The code treats a successful write-buffer accept (true) as broker delivery, missing messages lost on network failures or broker issues.

To ensure delivery: switch to ConfirmChannel via connection.createConfirmChannel(), use the callback signature sendToQueue(..., {}, (err, ok) => {...}) for per-message confirmation, and add mandatory: true with a 'return' event handler to detect unroutable publishes.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/services/audit/auditService.ts` around lines 30 - 45, The current use of
channel.sendToQueue(QUEUES.AUDIT_LOGS, ...) relies on the write-buffer boolean
and must be replaced with publisher confirms: create a ConfirmChannel via
connection.createConfirmChannel() and publish using ConfirmChannel.sendToQueue
with the callback form sendToQueue(..., { persistent: true, mandatory: true },
(err, ok) => { ... }) so you only treat the message as delivered on callback
success (err === null); also attach a channel 'return' event handler to detect
unroutable messages and handle/alert accordingly; update any logic in the audit
publishing path that currently throws on a boolean false to instead wait for the
confirm callback and handle err/return events.

Comment on lines +59 to +63
logger.error("CRITICAL: Audit logging failed (RabbitMQ unavailable)", {
eventType: entry.eventType,
error: error.message || error,
entry,
});
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Redact audit payloads before logging or emailing failures.

entry can carry arbitrary oldValue/newValue objects, and this path copies the full payload into general error logs and the alert email body. That widens access to potentially sensitive audit contents far beyond the audit store itself.

Also applies to: 83-90

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/services/audit/auditService.ts` around lines 59 - 63, The logger.error
call (and the alert email path) currently embeds the full audit entry object;
create and use a sanitizer (e.g. redactSensitiveFields or sanitizeAuditEntry)
that returns a copy of entry with sensitive keys removed or replaced (e.g. strip
or mask oldValue/newValue/any payload fields) and pass that sanitized object to
logger.error and to the email-sending routine (the alert email body path).
Update the code paths that reference entry (the logger.error call and the alert
email construction) to use the sanitized version instead of the raw entry to
avoid leaking sensitive audit payloads.

Comment on lines +65 to +98
try {
const logDir = path.dirname(config.logFile);
if (!fs.existsSync(logDir)) {
fs.mkdirSync(logDir, { recursive: true });
}

const fallbackPath = path.join(logDir, "lost-audits.log");
const fallbackEntry = {
timestamp: new Date().toISOString(),
...entry,
error: error instanceof Error ? error.message : String(error),
};

fs.appendFileSync(fallbackPath, JSON.stringify(fallbackEntry) + "\n");
logger.info(`Audit entry saved to fallback file: ${fallbackPath}`);

// Alert admin
if (config.notification.alertEmail) {
const subject = `CRITICAL: Audit Log System Failure - ${entry.eventType}`;
const body =
`Audit logging failed to publish to RabbitMQ.\n\n` +
`Event Type: ${entry.eventType}\n` +
`Action: ${entry.action}\n` +
`Error: ${error.message || error}\n\n` +
`The audit entry has been saved to the fallback file: ${fallbackPath}\n\n` +
`Entry Data: ${JSON.stringify(entry, null, 2)}`;

sendEmail(config.notification.alertEmail, subject, body).catch((e) => {
logger.error("Failed to send audit failure alert email", {
error: e.message || e,
});
});
}
} catch (fallbackError: any) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Still send the out-of-band alert when the file fallback fails.

sendEmail() is inside the same try as appendFileSync(), so disk/permission errors skip the only external alert and collapse the worst-case audit outage into another local log line. Move alerting outside the file-write try (or into a finally) so the “RabbitMQ down + fallback unavailable” case is still surfaced.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/services/audit/auditService.ts` around lines 65 - 98, The current try
block that writes the fallback file also contains the sendEmail call, so if
fs.appendFileSync or mkdirSync fails the alert email is skipped; refactor the
block in auditService.ts so that sendEmail(config.notification.alertEmail,
subject, body) is executed outside the filesystem try (or placed into a finally)
— keep building subject/body using entry and error as shown, wrap the sendEmail
call in its own catch to log failures via logger.error, and ensure
fallbackPath/logger messages remain in the file-write try while email alerting
is always attempted regardless of filesystem errors.

Comment on lines +1 to +5
describe('Simple', () => {
it('should pass', () => {
expect(1).toBe(1);
});
});
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Inconsistent with PR objectives: trivial test provides no coverage for the audit logging system.

The PR objectives describe implementing an asynchronous audit logging system with RabbitMQ, DLQ, retry logic, file fallback, and email alerts. However, this test file only contains a trivial assertion (expect(1).toBe(1)) that provides no actual test coverage for any of those features.

Consider adding meaningful tests that verify:

  • Audit event publishing to RabbitMQ
  • Retry logic for failed publishes
  • Dead-letter queue handling
  • File fallback persistence
  • Critical alert notifications
🧰 Tools
🪛 ESLint

[error] 1-1: Replace 'Simple' with "Simple"

(prettier/prettier)


[error] 2-2: Replace 'should·pass' with "should·pass"

(prettier/prettier)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/simple.test.ts` around lines 1 - 5, The current test only asserts a
trivial value and doesn't exercise the audit logging system; replace the trivial
spec with unit/integration tests that exercise the real code paths: call the
AuditPublisher.publishAuditEvent (or equivalent) to assert it enqueues to
RabbitMQClient.publish, simulate publish failures to verify the retry logic on
publishRetry, assert messages routed to DLQHandler.handleDeadLetter after
exceeded retries, verify FileFallback.persist (or fallbackToFile) is invoked
when RabbitMQ is unavailable, and mock sendCriticalAlert (or
AlertService.notifyCritical) to confirm alerting on persistent failures; use
spies/mocks for RabbitMQClient, DLQHandler, FileFallback, and AlertService to
assert behavior and add an integration-style test that runs the async flow
end-to-end with a test RabbitMQ or in-memory emulator.

Comment on lines +1 to +2
describe('Simple', () => {
it('should pass', () => {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Fix prettier formatting violations.

Replace single quotes with double quotes to match the project's prettier configuration.

🎨 Proposed fix
-describe('Simple', () => {
-  it('should pass', () => {
+describe("Simple", () => {
+  it("should pass", () => {
     expect(1).toBe(1);
   });
 });
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
describe('Simple', () => {
it('should pass', () => {
describe("Simple", () => {
it("should pass", () => {
expect(1).toBe(1);
});
});
🧰 Tools
🪛 ESLint

[error] 1-1: Replace 'Simple' with "Simple"

(prettier/prettier)


[error] 2-2: Replace 'should·pass' with "should·pass"

(prettier/prettier)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/simple.test.ts` around lines 1 - 2, The test file uses single quotes
which violate the project's Prettier rules; update string delimiters in the test
by replacing single quotes with double quotes in the describe and it calls
(e.g., the "describe" and "it" string literals) so the file uses double-quoted
strings consistently per the project's Prettier configuration.

@Junman140
Copy link
Copy Markdown
Member

@N-thnI cl failed

@N-thnI N-thnI closed this Mar 26, 2026
@N-thnI
Copy link
Copy Markdown
Contributor Author

N-thnI commented Mar 26, 2026

closes #56

@N-thnI N-thnI reopened this Mar 26, 2026
@Junman140 Junman140 merged commit 91997d9 into Pi-Defi-world:main Mar 26, 2026
1 of 2 checks passed
@Junman140
Copy link
Copy Markdown
Member

@N-thnI your code is still failing
Check the cl

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Audit failures are silent

2 participants